package groupOne.app.DWD.db;

import groupOne.app.BaseAppSql;
import groupOne.common.Constant;
import groupOne.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DwdInteractionComment_huangyanhui extends BaseAppSql {

    public static void main(String[] args) {
        new DwdInteractionComment_huangyanhui().init(
                4000,
                2,
                "DwdInteractionComment_huangyanhui"
        );
    }
    @Override
    protected void handle(StreamExecutionEnvironment env,
                          StreamTableEnvironment tEnv) {
        // 1. 读取ods_db
        readOdsDb(tEnv,"DwdInteractionComment_huangyanhui");
        // 2.读取评论表的数据
        Table commentInfo = tEnv.sqlQuery("select " +
                        "data['id'] id, " +
                        "data['user_id'] user_id, " +
                        "data['chapter_id'] chapter_id, " +
                        "data['course_id'] course_id, " +
                        "data['comment_txt'] comment_txt, " +
                        "data['create_time'] create_time, " +
                        "data['deleted'] deleted, " +
                        "ts " +
                        "from ods_db " +
                        "where `table` = 'comment_info' " +
                        "and `type` = 'insert'");

        //3.将结果写入kafka
        tEnv.executeSql("create table dwd_interaction_comment(" +
                        "id string, " +
                        "user_id string, " +
                        "chapter_id string, " +
                        "course_id string, " +
                        "comment_txt string, " +
                        "create_time string, " +
                        "deleted string, " +
                        "ts bigint " +
                        ")" + SQLUtil.getKafkaSink(Constant.TOPIC_DWD_INTERACTION_COMMENT)
                );
        commentInfo.executeInsert("dwd_interaction_comment");

    }
}
